import boto3
import time
import datetime
import io
import pandas as pd
from botocore.exceptions import ClientError

region = "cn-northwest-1"
access_key = "AKIA2V3QVTGADHOQR7DK"
secret_key = "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"
bucket_output = 'athenaout'

s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
s3_file = boto3.resource('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)


# 检查s3文件是否存在
def s3_file_check(bucket, key):
    try:
        s3_file.Object(bucket, key).load()
    except ClientError as e:
        return int(e.response['Error']['Code']) != 404
    return True

# 查询Athena数据
def get_athena_data_all_db(db,sql):

    response = do_athena_sql_all_db(db,sql)
    file_name = response['QueryExecutionId'] + '.csv'
    key = "res/" + file_name
    # print('key :\n', key)

    obj = None
    try:
        for i in range(180):
            time.sleep(0.1)
            if s3_file_check(bucket_output, key):
                # print('find res file!!!')
                time.sleep(0.1)
                break

        obj = s3.get_object(Bucket=bucket_output, Key=key)['Body'].read()
        # print('结果字符串：\n',io.BytesIO(obj))
        df = pd.read_csv(io.BytesIO(obj))
        # print('Athena return：\n', df)
    except Exception as e:
        print(e)
        return pd.DataFrame

    return df

#执行Athena sql 操作
def do_athena_sql_all_db(db,in_sql):
    # 连接 athena
    athena = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)

    # 通过athena进行数据统计
    # athena统计的结果只能放到S3中，所以需要提供临时存放文件的bucket和其中的output作为存放路径
    # athena计算完后，程序从S3中加载结果文件读取内容，然后放入到数据库中
    s3_output = "s3://athenaout/res/"
    try:
        # 从athena提取数据sql
        response = athena.start_query_execution(
            QueryString=in_sql,
            QueryExecutionContext={
                'Database': db
            },
            ResultConfiguration={
                'OutputLocation': s3_output
            }
        )
    except Exception as e:
        print('athena have error')
        print(e)
        print('-----------------------------------------')

    # print('athena response:\n', response)
    return response



# 根据起始时间，构造分区查询条件
def get_partition_condition(start,end):
    s=datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S")
    e=datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S")

    condition='  '
    if s.year==e.year:
        condition=" and partition_0='%s'"%(str(s.year).zfill(4))
        if s.month==e.month:
            condition=condition+" and partition_1='%s'"%(str(s.month).zfill(2))
            if s.day == e.day:
                condition = condition + " and partition_2='%s'" % (str(s.day).zfill(2))
                if s.hour == e.hour:
                    condition = condition + " and partition_3='%s'" % (str(s.hour).zfill(2))
        else:
            m=''
            for i in range(s.month,e.month+1):
                m=m+"'%s',"%(str(i).zfill(2))

            condition = condition + " and partition_1 in (%s)" % (m[:-1])
    else:
        y = ' '
        for i in range(s.year, e.year + 1):
            y = y + "'%s'," % (str(i).zfill(4))

        condition = condition + " and partition_0 in (%s)" % (y[:-1])



    return condition



